﻿using Microsoft.Extensions.Options;
using Naruto.Redis.Config;
using Naruto.Redis.Connection;
using Naruto.Redis.Object;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Naruto.Redis.Internal
{
    public class RedisStream : Interface.IRedisStream
    {
        /// <summary>
        /// redis 基础操作接口
        /// </summary>
        private readonly IRedisConnection  redisConnection;

        /// <summary>
        /// 实例化连接
        /// </summary>
        public RedisStream(IRedisConnection _redisConnection)
        {
            redisConnection = _redisConnection;
        }

        public bool Ack(string key, string groupName, string messageId)
        {
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (string.IsNullOrWhiteSpace(groupName))
                throw new ArgumentNullException(nameof(groupName));
            if (string.IsNullOrWhiteSpace(messageId))
                throw new ArgumentNullException(nameof(messageId));
            return redisConnection.Data(redisConnection.DataBase).StreamAcknowledge(key, groupName, messageId) > 0;
        }

        public async Task<bool> AckAsync(string key, string groupName, string messageId, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();

            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (string.IsNullOrWhiteSpace(groupName))
                throw new ArgumentNullException(nameof(groupName));
            if (string.IsNullOrWhiteSpace(messageId))
                throw new ArgumentNullException(nameof(messageId));

            return (await redisConnection.Data(redisConnection.DataBase).StreamAcknowledgeAsync(key, groupName, messageId)) > 0;
        }

        public string Add(string key, IDictionary<string, byte[]> values, string messageId = null)
        {
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (values == null || values.Count <= 0)
                throw new ArgumentNullException(nameof(values));
            RedisValue? _messageId = messageId == null ? null : new RedisValue?(messageId);
            return redisConnection.Data(redisConnection.DataBase).StreamAdd(new StackExchange.Redis.RedisKey(key), values.Select(a => new NameValueEntry(a.Key, a.Value)).ToArray(), _messageId);
        }

        public async Task<string> AddAsync(string key, IDictionary<string, byte[]> values, string messageId = null, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (values == null || values.Count <= 0)
                throw new ArgumentNullException(nameof(values));
            RedisValue? _messageId = messageId == null ? null : new RedisValue?(messageId);
            return await redisConnection.Data(redisConnection.DataBase).StreamAddAsync(new StackExchange.Redis.RedisKey(key), values.Select(a => new NameValueEntry(a.Key, a.Value)).ToArray(), _messageId).ConfigureAwait(false);
        }

        public bool CreateConsumerGroup(string key, string groupName)
        {
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (string.IsNullOrWhiteSpace(groupName))
                throw new ArgumentNullException(nameof(groupName));
            //验证当前消费者组是否已经存在
            var groupInfolist = GetConsumerGroupInfo(key);
            if (groupInfolist != null && groupInfolist.Any(a => a.Name.Equals(groupName)))
            {
                return true;
            }
            return redisConnection.Data(redisConnection.DataBase).StreamCreateConsumerGroup(key, groupName);
        }

        public async Task<bool> CreateConsumerGroupAsync(string key, string groupName, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (string.IsNullOrWhiteSpace(groupName))
                throw new ArgumentNullException(nameof(groupName));
            //验证当前消费者组是否已经存在
            var groupInfolist = await GetConsumerGroupInfoAsync(key);
            if (groupInfolist != null && groupInfolist.Any(a => a.Name.Equals(groupName)))
            {
                return true;
            }
            return await redisConnection.Data(redisConnection.DataBase).StreamCreateConsumerGroupAsync(key, groupName).ConfigureAwait(false);
        }

        public bool Delete(string key, string[] messageIds)
        {
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (messageIds == null || messageIds.Length <= 0)
                throw new ArgumentNullException(nameof(messageIds));
            return redisConnection.Data(redisConnection.DataBase).StreamDelete(key, messageIds.Select(a => new RedisValue(a)).ToArray()) > 0;
        }

        public async Task<bool> DeleteAsync(string key, string[] messageIds, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            if (messageIds == null || messageIds.Length <= 0)
                throw new ArgumentNullException(nameof(messageIds));
            return (await redisConnection.Data(redisConnection.DataBase).StreamDeleteAsync(key, messageIds.Select(a => new RedisValue(a)).ToArray())) > 0;
        }

        public ConsumerGroupInfo[] GetConsumerGroupInfo(string key)
        {
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            try
            {
                var resultList = redisConnection.Data(redisConnection.DataBase).StreamGroupInfo(key);
                return resultList?.Select(a => new Naruto.Redis.Object.ConsumerGroupInfo
                {
                    ConsumerCount = a.ConsumerCount,
                    LastDeliveredId = a.LastDeliveredId,
                    Name = a.Name,
                    PendingMessageCount = a.PendingMessageCount
                }).ToArray();
            }
            catch (Exception)
            {
                return default;
            }

        }

        public async Task<ConsumerGroupInfo[]> GetConsumerGroupInfoAsync(string key, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            if (string.IsNullOrWhiteSpace(key))
                throw new ArgumentNullException(nameof(key));
            try
            {
                var resultList = await redisConnection.Data(redisConnection.DataBase).StreamGroupInfoAsync(key);
                return resultList?.Select(a => new Naruto.Redis.Object.ConsumerGroupInfo
                {
                    ConsumerCount = a.ConsumerCount,
                    LastDeliveredId = a.LastDeliveredId,
                    Name = a.Name,
                    PendingMessageCount = a.PendingMessageCount
                }).ToArray();
            }
            catch (Exception)
            {
                return default;
            }
        }

        public StackExchange.Redis.RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, string groupName, string consumerName, int? countPerStream = null, bool noAck = false)
        {
            if (streamPositions == null || streamPositions.Length < 0)
                throw new ArgumentNullException(nameof(streamPositions));
            if (string.IsNullOrWhiteSpace(groupName))
                throw new ArgumentNullException(nameof(groupName));
            if (string.IsNullOrWhiteSpace(consumerName))
                throw new ArgumentNullException(nameof(consumerName));

            return redisConnection.Data(redisConnection.DataBase).StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck);
        }

        public async Task<StackExchange.Redis.RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, string groupName, string consumerName, int? countPerStream = null, bool noAck = false, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();

            if (streamPositions == null || streamPositions.Length < 0)
                throw new ArgumentNullException(nameof(streamPositions));
            if (string.IsNullOrWhiteSpace(groupName))
                throw new ArgumentNullException(nameof(groupName));
            if (string.IsNullOrWhiteSpace(consumerName))
                throw new ArgumentNullException(nameof(consumerName));

            return await redisConnection.Data(redisConnection.DataBase).StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck);
        }
    }
}
